Skip to content

fix: prevent premature stream closure in EventConsumer grace period#764

Draft
kabir wants to merge 5 commits intoa2aproject:mainfrom
kabir:fix-local-handling-race-condition
Draft

fix: prevent premature stream closure in EventConsumer grace period#764
kabir wants to merge 5 commits intoa2aproject:mainfrom
kabir:fix-local-handling-race-condition

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Mar 23, 2026

The EventConsumer grace period logic could close streams prematurely when final events were still in-transit through MainEventBusProcessor. This manifested as intermittent test failures where the stream would close before all events were delivered.

Root Cause:

  • When agent execution completes, EventConsumer enters a grace period
  • It polls the ChildQueue with 500ms timeout, allowing 3 consecutive timeouts (1.5s total) before closing the stream
  • The original logic only checked queue.size() == 0
  • However, final events can be in-transit: MainQueue → MainEventBus → MainEventBusProcessor → ChildQueue
  • This timing window (typically <500ms) could result in premature closure when the local queue was empty but the final event hadn't arrived yet

Solution:

  • Added EventQueue.isAwaitingFinalEvent() method
  • MainQueue calls child.expectFinalEvent() when enqueueing final events
  • EventConsumer checks awaitingFinalEvent flag before starting timeout counter: agentCompleted && queueSize == 0 && !awaitingFinal
  • ChildQueue clears the flag when ANY event is dequeued (not just final events, to handle the timing window simply)
  • This ensures the grace period doesn't start counting down while a final event is still being distributed

The fix handles both local execution (events available immediately) and replicated scenarios (events may arrive via Kafka with delays).

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where the EventConsumer was prematurely closing streams during the grace period, before all final events were fully processed. The fix introduces a mechanism to track whether a final event is still in transit, preventing the grace period timeout from starting until all events have been delivered.

Highlights

  • Premature Stream Closure: The EventConsumer grace period logic could close streams prematurely when final events were still in-transit, leading to intermittent test failures.
  • Root Cause Analysis: The original logic only checked queue.size() == 0, but final events could still be in-transit through MainEventBusProcessor.
  • Solution: Implemented EventQueue.isAwaitingFinalEvent() and updated EventConsumer to check this flag before starting the timeout counter, ensuring the grace period doesn't start prematurely.
Ignored Files
  • Ignored by pattern: .github/workflows/** (1)
    • .github/workflows/test-event-race-condition-loop.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request addresses a race condition that could cause premature stream closure in EventConsumer. The solution, which involves introducing an awaitingFinalEvent flag to delay the grace period timeout, is well-reasoned and correctly implemented in EventConsumer. However, there is a potential concurrency issue in EventQueue.ChildQueue with the new flag that should be addressed to ensure thread safety. The review includes a comment with a high-severity finding to address this potential data race, recommending the use of volatile for thread safety.

Comment on lines +786 to +788
public boolean isAwaitingFinalEvent() {
return awaitingFinalEvent;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The awaitingFinalEvent flag is accessed from multiple threads without ensuring visibility of changes. The EventConsumer thread reads the flag via this isAwaitingFinalEvent() method, while another thread writes to it when an event is enqueued. This can lead to a race condition where the consumer thread reads a stale value.

To guarantee that writes to awaitingFinalEvent are visible to other threads, it should be declared as volatile. Using AtomicBoolean with compareAndSet() is not strictly necessary since the variable is thread-confined.

For example, in ChildQueue:

private volatile boolean awaitingFinalEvent = false;

This will establish a proper happens-before relationship and ensure the fix is thread-safe.

References
  1. When a variable is strictly thread-confined and never shared between threads, volatile is sufficient for visibility and correctness. Using AtomicBoolean with compareAndSet() is a general recommendation for managing shared mutable state across multiple threads, but it's not strictly necessary in a thread-confined context.

@kabir kabir force-pushed the fix-local-handling-race-condition branch from c10c262 to 2d7573a Compare March 23, 2026 14:31
@kabir kabir marked this pull request as draft March 23, 2026 15:10
@kabir kabir force-pushed the fix-local-handling-race-condition branch from 2d7573a to 3d3d8d0 Compare March 23, 2026 15:29
kabir added 2 commits March 23, 2026 15:49
The EventConsumer grace period logic could close streams prematurely when
final events were still in-transit through MainEventBusProcessor. This
manifested as intermittent test failures where the stream would close
before all events were delivered.

Root Cause:
- When agent execution completes, EventConsumer enters a grace period
- It polls the ChildQueue with 500ms timeout, allowing 3 consecutive
  timeouts (1.5s total) before closing the stream
- The original logic only checked queue.size() == 0
- However, final events can be in-transit: MainQueue → MainEventBus →
  MainEventBusProcessor → ChildQueue
- This timing window (typically <500ms) could result in premature closure
  when the local queue was empty but the final event hadn't arrived yet

Solution:
- Added EventQueue.isAwaitingFinalEvent() method
- MainQueue calls child.expectFinalEvent() when enqueueing final events
- EventConsumer checks awaitingFinalEvent flag before starting timeout
  counter: agentCompleted && queueSize == 0 && !awaitingFinal
- ChildQueue clears the flag only when a FINAL event is dequeued (not on
  any event, to avoid clearing it too early when non-final events arrive)
- This ensures the grace period doesn't start counting down while a final
  event is still being distributed

The fix handles both local execution (events available immediately) and
replicated scenarios (events may arrive via Kafka with delays).
Add GitHub Actions workflow to run the intermittent tests 100 times
across all transports (REST, JSON-RPC, gRPC) and JDK versions (17, 21, 25).

Tests verified:
- testAgentToAgentLocalHandling
- testNonBlockingWithMultipleMessages
- testAuthRequiredWorkflow

The workflow stops on first failure and uploads surefire reports for
debugging. This is a temporary workflow to validate the fix and will be
removed once verified on CI.
@kabir kabir force-pushed the fix-local-handling-race-condition branch from 3d3d8d0 to 714b1f1 Compare March 23, 2026 15:56
kabir and others added 3 commits March 23, 2026 16:21
The timeout logic was setting a local variable to false, but the next
iteration would read true again from queue.isAwaitingFinalEvent(),
causing the grace period to never start. Added clearAwaitingFinalEvent()
method to ChildQueue to properly clear the flag on the queue itself.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant